查看原文
其他

​单机下如何更快更准确查询Parquet?

alitrack alitrack 2022-09-22

TLDR:DuckDB 是一个免费的开源分析数据管理系统,可以直接对 Parquet 文件运行 SQL 查询,并自动利用 Parquet 格式的高级功能。

Apache Parquet 是最常见的用于分析的“大数据”存储格式。在 Parquet 文件中,数据以列压缩的二进制格式存储。每个 Parquet 文件存储一个表。表被划分为行组,每个行组包含表行的一个子集。在行组内,表数据以列方式存储。

Parquet 格式具有许多适用于分析用例的属性:

  1. 列式表示意味着可以(有效地)读取各个列。无需总是读取整个文件!
  2. 该文件包含每个行组中的每列统计信息(最小值/最大值,以及值的数量NULL)。如果不需要,这些统计信息允许读者跳过行组。
  3. 列压缩显着减小了格式的文件大小,从而降低了数据集的存储需求。这通常可以将大数据转化为中型数据。

DuckDB 和 Parquet

DuckDB 的零依赖 Parquet reader 能够直接对 Parquet 文件执行 SQL 查询,无需任何导入或分析步骤。由于 Parquet 的自然列式格式,这非常快!

DuckDB 将以流式方式读取 Parquet 文件,这意味着您可以对不适合常驻内存的大型 Parquet 文件执行查询。

DuckDB 能够自动检测任何给定查询需要哪些列和行。这允许用户分析更大、更复杂的 Parquet 文件,而无需执行手动优化或投入更多硬件。

作为额外的好处,DuckDB 能够使用 glob 语法使用并行处理和同时处理多个 Parquet 文件来完成所有这些。

作为一个简短的预告,这里有一个代码片段,它允许您直接在 Parquet 文件之上运行 SQL 查询。

# to install: pip install duckdb
# to download the parquet file:
# wget https://github.com/cwida/duckdb-data/releases/download/v1.0/taxi_2019_04.parquet
import duckdb

print(duckdb.query('''
SELECT COUNT(*)
FROM 'taxi_2019_04.parquet'
WHERE pickup_at BETWEEN '2019-04-15' AND '2019-04-20'
'''
).fetchall())

自动过滤器和下推

让我们深入研究前面的查询,以更好地了解 Parquet 格式与 DuckDB 的查询优化器结合使用时的强大功能。

SELECT COUNT(*)
FROM 'taxi_2019_04.parquet'
WHERE pickup_at BETWEEN '2019-04-15' AND '2019-04-20'

在此查询中,我们从 Parquet 文件 ( pickup_at) 中读取单个列。Parquet 文件中存储的任何其他列都可以完全跳过,因为我们不需要它们来回答我们的查询。

过滤下推到parquet文件示例。

此外,只有pickup_at2019 年 4 月 15 日到 20 日之间的行才会影响查询结果。可以跳过不满足此谓词的任何行。

在这里我们可以充分利用 Parquet 文件中的统计数据。可以跳过最大值pickup_at低于2019-04-15或最小值高于 的任何行组2019-04-20。在某些情况下,这允许我们跳过读取整个文件。

DuckDB 与 Pandas

为了说明这些自动优化的效果,我们将使用 Pandas 和 DuckDB 在 Parquet 文件之上运行许多查询。

在这些查询中,我们使用存储为 Parquet 文件的纽约出租车数据集的一部分,特别是 2019 年 4 月、5 月和 6 月的数据。这些文件大约是。总共 360 MB 大小,包含大约 2100 万行,每行 18 列。这三个文件被放入taxi/文件夹中。

这些示例可 Google Colab[1]上找到。此处报告的时间来自此环境以实现可重复性。

读取多个 Parquet 文件

首先,我们查看数据集中的一些行。文件taxi/夹中有三个 Parquet 文件。DuckDB 支持 globbing 语法[2],这允许它同时查询所有三个文件。

con.execute("""
   SELECT *
   FROM 'taxi/*.parquet'
   LIMIT 5"
"").df()
pickup_atdropoff_atpassenger_counttrip_distancerate_code_id
2019-04-01 00:04:092019-04-01 00:06:3510.51
2019-04-01 00:22:452019-04-01 00:25:4310.71
2019-04-01 00:39:482019-04-01 01:19:39110.91
2019-04-01 00:35:322019-04-01 00:37:1110.21
2019-04-01 00:44:052019-04-01 00:57:5814.81

尽管查询从三个(相当大的)Parquet 文件中选择了所有列,但查询会立即完成。这是因为 DuckDB 以流式方式处理 Parquet 文件,并且在读取前几行后立即停止读取 Parquet 文件,因为这都是满足查询所必需的。

如果我们尝试在 Pandas 中做同样的事情,我们会发现它并不是那么简单,因为 Pandas 不能在一次调用中读取多个 Parquet 文件。我们首先必须使用pandas.concat将三个 Parquet 文件连接在一起:

import pandas
import glob
df = pandas.concat(
 [pandas.read_parquet(file)
  for file
  in glob.glob('taxi/*.parquet')])
print(df.head(5))

以下是这两个查询的时间。

SystemTime(s)
DuckDB0.015
Pandas12.300

Pandas 需要更长的时间来完成这个查询。这是因为 Pandas 不仅需要完整读取三个 Parquet 文件中的每一个,还必须将这三个单独的 Pandas DataFrame 连接在一起。

连接成单个文件

我们可以通过从三个较小的部分创建一个大 Parquet 文件来解决连接问题。我们可以使用pyarrow,它支持读取多个 Parquet 文件并将它们流式传输到单个大文件中。请注意,pyarrow 的 parquet reader 与 Pandas 内部使用的 parquet reader 完全相同。

import pyarrow.parquet as pq
# concatenate all three parquet files
pq.write_table(pq.ParquetDataset('taxi/').read(), 'alltaxi.parquet', row_group_size=100000)

DuckDB[3] 还支持使用 COPY 语句写 Parquet 文件[4]

查询大文件

现在让我们重复之前的实验,但使用单个文件。

# DuckDB
con.execute("""
   SELECT *
   FROM 'alltaxi.parquet'
   LIMIT 5"""
).df()

# Pandas
pandas.read_parquet('alltaxi.parquet')
      .head(5)
SystemTime(s)
DuckDB0.02
pandas7.50

我们可以看到 Pandas 的性能比以前更好,因为避免了连接。但是,仍然需要将整个文件读入内存,这会占用大量时间和内存。

对于 DuckDB,在查询中需要读取多少 Parquet 文件并不重要。

计数行

现在假设我们想弄清楚我们的数据集中有多少行。我们可以使用以下代码来做到这一点:

# DuckDB
con.execute("""
   SELECT COUNT(*)
   FROM 'alltaxi.parquet'
"""
).df()

# Pandas
len(pandas.read_parquet('alltaxi.parquet'))
SystemTime(s)
DuckDB0.015
Pandas7.500

DuckDB 可以非常快速地完成查询,因为它会自动识别需要从 Parquet 文件中读取的内容并最大限度地减少所需的读取。Pandas 必须再次读取整个文件,这会导致它花费与前一个查询相同的时间。

对于这个查询,我们可以通过手动优化来提高 Pandas 的时间。为了获得计数,我们只需要文件中的一列。通过在read_parquet命令中手动指定要读取的单个列,我们可以获得相同的结果,但速度要快得多。

len(pandas.read_parquet('alltaxi.parquet', columns=['vendor_id']))
SystemTime(s)
DuckDB0.015
Pandas7.500
Pandas(优化)1.200

虽然这要快得多,但这仍然需要一秒钟以上的时间,因为必须将整个vendor_id列作为 Pandas 列读入内存才能计算行数。

过滤行

通常使用某种过滤来只查看数据集的有趣部分。例如,假设我们想知道在 2019 年 6 月 30 日之后发生了多少辆出租车。我们可以在 DuckDB 中使用以下查询来做到这一点:

con.execute("""
   SELECT COUNT(*)
   FROM 'alltaxi.parquet'
   WHERE pickup_at > '2019-06-30'
"""
).df()

查询完成45ms并产生以下结果:

count
167022

在 Pandas 中,我们可以使用简单的方法执行相同的操作。

# pandas naive
len(pandas.read_parquet('alltaxi.parquet')
          .query("pickup_at > '2019-06-30'"))

然而,这再次将整个文件读入内存,导致此查询耗时7.5s. 通过手动下推,我们可以将其降低到0.9s. 仍然明显高于 DuckDB。

# pandas projection pushdown
len(pandas.read_parquet('alltaxi.parquet', columns=['pickup_at'])
          .query("pickup_at > '2019-06-30'"))

然而,pyarrow的 parquet reader 还允许我们在扫描中执行过滤器下推。一旦我们添加了这一点,我们就会以更有竞争力的70ms方式完成查询。

len(pandas.read_parquet('alltaxi.parquet', columns=['pickup_at'], filters=[('pickup_at''>''2019-06-30')]))
系统Time(s)
DuckDB0.05
Pandas7.50
Pandas(下推)0.90
Pandas(过滤器下推)0.07

这表明这里的结果并不是因为 DuckDB 的 parquet reader 比pyarrow的 Parquet reader 快。DuckDB 在这些查询上表现更好的原因是它的优化器会自动从 SQL 查询中提取所有必需的列和过滤器,然后在 Parquet reader 中自动使用,无需手动操作。

有趣的是,pyarrow的 Parquet reader 和 DuckDB 都比在 Pandas 中对 DataFrame 原生执行此操作要快得多。

# read the entire parquet file into Pandas
df = pandas.read_parquet('alltaxi.parquet')
# run the query natively in Pandas
# note: we only time this part
print(len(df[['pickup_at']].query("pickup_at > '2019-06-30'")))
系统Time(s)
DuckDB0.05
Pandas7.50
Pandas(下推)0.90
Pandas(过滤器下推)0.07
Pandas(原生)0.26

聚合

最后让我们看一个更复杂的查询,聚合。假设我们要计算每位乘客的乘车次数。使用 DuckDB 和 SQL,它看起来像这样:

con.execute("""
    SELECT passenger_count, COUNT(*)
    FROM 'alltaxi.parquet'
    GROUP BY passenger_count"""
).df()

220ms完成并产生以下结果:

passenger_countcount
0408742
115356631
23332927
3944833
4439066
5910516
6546467
7106
872
964

对于讨厌 SQL (为啥?因为不会写?)的人,DuckDB 还提供了关系 API,允许对查询进行更 Python 式的声明。这是与上面的 SQL 查询等价的,它提供了完全相同的结果和性能:

con.from_parquet('alltaxi.parquet')
   .aggregate('passenger_count, count(*)')
   .df()

现在作为比较,让我们以与之前相同的方式在 Pandas 中运行相同的查询。

# naive
pandas.read_parquet('alltaxi.parquet')
      .groupby('passenger_count')
      .agg({'passenger_count' : 'count'})

# projection pushdown
pandas.read_parquet('alltaxi.parquet', columns=['passenger_count'])
      .groupby('passenger_count')
      .agg({'passenger_count' : 'count'})

# native (parquet file pre-loaded into memory)
df.groupby('passenger_count')
  .agg({'passenger_count' : 'count'})
SystemTime(s)
DuckDB0.22
Pandas7.50
Pandas(下推)0.58
Pandas(原生)0.51

我们可以看到 DuckDB 在所有三个场景中都比 Pandas 更快,无需执行任何手动优化,也无需将 Parquet 文件完整加载到内存中。

结论

DuckDB 可以直接在 Parquet 文件之上高效地运行查询,而无需初始加载阶段。系统将自动利用 Parquet 的所有高级功能来加速查询执行。

DuckDB 是一个免费的开源数据库管理系统(MIT 许可)。它旨在成为用于分析的 SQLite,并提供一个快速高效的零外部依赖关系的数据库系统。它不仅适用于 Python,还适用于 C/C++、R、Java 等。

参考资料

[1]

Google Colab: https://colab.research.google.com/drive/1e1beWqYOcFidKl2IxHtxT5s9i_6KYuNY

[2]

DuckDB 支持 globbing 语法: https://duckdb.org/docs/data/parquet

[3]

DuckDB: https://duckdb.org/docs/data/parquet#writing-to-parquet-files

[4]

Parquet 文件: https://duckdb.org/docs/data/parquet#writing-to-parquet-files


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存